RocketMQ Connect 快速入门
快速入门
本教程将以独立模式启动 RocketMQ Connector 示例项目“rocketmq-connect-sample”,帮助您理解连接器的运行原理。示例项目提供了一个源连接器,它从源文件读取数据并将其发送到 RocketMQ 集群。它还提供了一个接收连接器,它从 RocketMQ 集群读取消息并将它们写入目标文件。
1. 准备:启动 RocketMQ
- Linux/Unix/Mac
- 64 位 JDK 1.8+
- Maven 3.2.x+
- 启动 RocketMQ。可以使用 RocketMQ 4.x 或 RocketMQ 5.x 5.x 版本。
- 使用工具测试 RocketMQ 消息发送和接收。
这里使用环境变量 NAMESRV_ADDR 通知工具客户端 RocketMQ 的 NameServer 地址,例如 localhost:9876。
#$ cd distribution/target/rocketmq-4.9.7/rocketmq-4.9.7
$ cd distribution/target/rocketmq-5.1.4/rocketmq-5.1.4
$ export NAMESRV_ADDR=localhost:9876
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
$ sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
注意:RocketMQ 具有自动创建主题和组的功能。在发送或订阅消息时,如果相应的主题或组不存在,RocketMQ 将自动创建它们。因此,无需提前创建主题和组。
2. 构建连接器运行时
git clone https://github.com/apache/rocketmq-connect.git
cd rocketmq-connect
export RMQ_CONNECT_HOME=`pwd`
mvn -Prelease-connect -Dmaven.test.skip=true clean install -U
注意:该项目默认包含 rocketmq-connect-sample 的代码,因此无需单独构建 rocketmq-connect-sample 插件。
3. 以独立模式运行连接器工作器
修改配置
修改 connect-standalone.conf
文件以配置 RocketMQ 连接地址和其他信息。有关详细信息,请参阅 9. 配置文件说明。
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
vim conf/connect-standalone.conf
在独立模式下,RocketMQ Connect 将同步检查点信息持久化到本地文件目录 storePathRootDir。
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
如果要重置同步检查点,则需要删除持久化的检查点文件。
rm -rf /Users/YourUsername/rocketmqconnect/storeRoot/*
以独立模式启动连接器工作器
sh bin/connect-standalone.sh -c conf/connect-standalone.conf &
提示:可以修改 docker/connect/bin/runconnect.sh
以根据需要调整 JVM 启动参数。
JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m"
查看启动日志文件
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果运行时启动成功,您将在日志文件中看到以下打印内容
The standalone worker boot success.
要退出 tail -f
命令的日志跟踪模式,可以按 Ctrl + C
键组合。
4. 启动源连接器
创建源文件并写入测试数据
mkdir -p /Users/YourUsername/rocketmqconnect/
cd /Users/YourUsername/rocketmqconnect/
touch test-source-file.txt
echo "Hello \r\nRocketMQ\r\n Connect" >> test-source-file.txt
注意:源文件中不应该有空行(如果遇到空行,演示程序将抛出错误)。源连接器将持续读取源文件,并将每行数据转换为消息体,发送到 RocketMQ 供接收连接器消费。
启动源连接器
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSourceConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSourceConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-source-file.txt",
"connect.topicname": "fileTopic"
}'
如果 curl 请求返回状态 200,则表示创建成功。示例响应
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSourceConnector","filename":"/Users/YourUsername/rocketmqconnect/test-source-file.txt","connect.topicname":"fileTopic"}}
查看日志文件
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果您看到以下日志,则表示文件源连接器已成功启动
Start connector fileSourceConnector and set target state STARTED successed!!
源连接器配置说明
键 | 可空 | 默认值 | 描述 |
---|---|---|---|
connector.class | false | 实现 Connector 接口的类名(包括包名) | |
filename | false | 源文件名(建议使用绝对路径) | |
connect.topicname | false | 同步文件数据所需的主题 |
5. 启动接收连接器
curl -X POST -H "Content-Type: application/json" http://127.0.0.1:8082/connectors/fileSinkConnector -d '{
"connector.class": "org.apache.rocketmq.connect.file.FileSinkConnector",
"filename": "/Users/YourUsername/rocketmqconnect/test-sink-file.txt",
"connect.topicnames": "fileTopic"
}'
如果 curl 请求返回状态 200,则表示创建成功。示例响应
{"status":200,"body":{"connector.class":"org.apache.rocketmq.connect.file.FileSinkConnector","filename":"/Users/YourUsername/rocketmqconnect/test-sink-file.txt","connect.topicnames":"fileTopic"}}
查看日志文件
tail -100f ~/logs/rocketmqconnect/connect_runtime.log
如果您看到以下日志,则表示文件接收连接器已成功启动
Start connector fileSinkConnector and set target state STARTED successed!!
检查接收连接器是否已将数据写入目标文件
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
如果生成了 test-sink-file.txt 文件,并且其内容与 test-source-file.txt 相同,则表示整个过程运行正常。
继续将测试数据写入源文件 test-source-file.txt
cd /Users/YourUsername/rocketmqconnect/
echo "Say Hi to\r\nRMQ Connector\r\nAgain" >> test-source-file.txt
# Wait a few seconds, check if rocketmq-connect replicate data to sink file succeed
sleep 10
cat /Users/YourUsername/rocketmqconnect/test-sink-file.txt
注意:文件内容的顺序可能会有所不同,因为 rocketmq-connect-sample
在向 RocketMQ 主题发送和接收消息时使用的是 普通消息
。这与 有序消息
不同,消费 普通消息
并不保证顺序。
接收连接器配置说明
键 | 可空 | 默认值 | 描述 |
---|---|---|---|
connector.class | false | 实现 Connector 接口的类名(包括包名) | |
filename | false | 接收连接器拉取数据并将其保存到文件(建议使用绝对路径) | |
connect.topicnames | false | 接收连接器需要处理的数据消息的主题 |
提示:示例 rocketmq-connect-sample 的配置文件说明仅供参考,不同的源/接收连接器具有不同的配置,请参考具体的源/接收连接器。
6. 停止连接器
停止连接器的 RESTful 命令格式为 http://(您的工作器 IP):(端口)/connectors/(连接器名称)/stop
要停止演示中的两个连接器,可以使用以下命令
curl http://127.0.0.1:8082/connectors/fileSinkConnector/stop
curl http://127.0.0.1:8082/connectors/fileSourceConnector/stop
如果 curl 请求返回状态 200,则表示成功停止连接器。示例响应
{"status":200,"body":"Connector[fileSinkConnector]deleted successfully"}
如果您看到以下日志消息,则表示文件接收连接器已成功关闭
tail -100f ~/logs/rocketmqconnect/connect_default.log
Completed shutdown for connectorName:fileSinkConnector
7. 停止工作器进程
cd $RMQ_CONNECT_HOME/distribution/target/rocketmq-connect-0.0.1-SNAPSHOT/rocketmq-connect-0.0.1-SNAPSHOT
sh bin/connectshutdown.sh
8. 日志目录
可以使用以下命令查看日志目录
ls $HOME/logs/rocketmqconnect
ls ~/logs/rocketmqconnect
9. 配置文件说明
根据您的使用情况修改 RESTful 端口、storeRoot 路径、NameServer 地址和其他信息。
以下是一个配置文件示例
#current cluster node uniquely identifies
workerId=DEFAULT_WORKER_1
# Http prot for user to access REST API
httpPort=8082
# Local file dir for config store
storePathRootDir=/Users/YourUsername/rocketmqconnect/storeRoot
#You need to modify it to your own rocketmq nameserver endpoint.
# RocketMQ namesrvAddr
namesrvAddr=127.0.0.1:9876
# Plugin path for loading Source/Sink Connectors
# The rocketmq-connect project already includes the rocketmq-connect-sample module by default, so no configuration is needed here.
pluginPaths=
storePathRootDir 配置说明
在独立模式下,RocketMQ Connect 将同步检查点信息持久化到 storePathRootDir 指定的本地文件目录。持久化文件包括
键 | 描述 |
---|---|
connectorConfig.json | 连接器配置持久化文件 |
position.json | 源连接器数据处理进度持久化文件 |
taskConfig.json | 任务配置持久化文件 |
offset.json | 接收连接器数据消费进度持久化文件 |
connectorStatus.json | 连接器状态持久化文件 |
taskStatus.json | 任务状态持久化文件 |